Notes on Kafka Streams

By Shreyas Kaundinya

Apache Kafka

Meaning of at-least once, at-most once and exactly-once delivery

Apache Kafka

Architecture

Kafka Streams Architecture

Introduction

Concepts

Processors in topology

Source Processor

Sink Processor

Time

Event Time

Processing Time

Ingestion Time

Kafka Stream assigns a timestamp to every record via TimestampExtractor
interface

Whenever Kafka Stream writes records to Kafka, it will also assign timestamps to
these new records

Timestamps depend on context

For aggregations & joins, timestamps are computed by using the following rules

Streams & Tables

Stream as table

Table as a stream

Use : KStream, KTable, GlobalKTable

Aggregations

Windowing

Windowing Types :

States

Processing Guarantees

Stream Partitions and Tasks

Intro

Sample Code Snippets

Just a simple Kafka Streams Class

 1package org.example.streamconsumer;
 2
 3import org.apache.kafka.common.serialization.Serdes;
 4import org.apache.kafka.streams.KafkaStreams;
 5import org.apache.kafka.streams.StreamsBuilder;
 6import org.apache.kafka.streams.StreamsConfig;
 7import org.apache.kafka.streams.Topology;
 8import org.apache.kafka.streams.kstream.KStream;
 9import org.example.consumer.Consumer;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13import java.util.Properties;
14
15public class StreamConsumer {
16    private KafkaStreams kafkaStreams;
17    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
18
19    public void run() {
20        // this starts the KafkaStreams
21        this.kafkaStreams.start();
22    }
23
24    public StreamConsumer(String topic) {
25        // Builder used to define the topology for Streams
26        StreamsBuilder streamsBuilder = new StreamsBuilder();
27
28        // Define the topology
29        KStream<String, String> listeningStream = streamsBuilder.stream(topic);
30
31        listeningStream.foreach(
32                (key, value) -> {
33                    log.info("[StreamConsumer] Got message : {}", value);
34                }
35        );
36
37        // build the topology of given Streams Graph
38        Topology topology = streamsBuilder.build();
39
40        // Properties for the Streams Config
41        Properties props = new Properties();
42        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "shreyas.demo.app");
43        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093,localhost:9094,localhost:9095");
44        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
45        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
46
47        // Instantiate the Streams
48        this.kafkaStreams = new KafkaStreams(topology, props);
49    }
50}

Tags